package com.example.dobs.demo.flink.io;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

public class TestwithBroadcastSet {
    public static void main(String[] args) {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        ArrayList<Tuple2<Integer, String>> list = new ArrayList<>();
        list.add(new Tuple2<>(1, "大胖"));
        list.add(new Tuple2<>(2, "二狗子"));
        list.add(new Tuple2<>(3, "三狗子"));
        list.add(new Tuple2<>(4, "小贵子"));
        list.add(new Tuple2<>(5, "小丸子"));
        ArrayList<Tuple3<Integer, String, Integer>> list2 = new ArrayList<>();
        list2.add(new Tuple3<>(1, "语文", 50));
        list2.add(new Tuple3<>(2, "数学", 70));
        list2.add(new Tuple3<>(3, "英文", 86));
        list2.add(new Tuple3<>(4, "化学", 86));
        list2.add(new Tuple3<>(5, "物理", 86));

        DataStreamSource<Tuple2<Integer, String>> student = see.fromCollection(list);
        DataStreamSource<Tuple3<Integer, String, Integer>> score = see.fromCollection(list2);

        student.broadcast();

        SingleOutputStreamOperator<Tuple3<String, String, Integer>> bc_student = score.map(new RichMapFunction<Tuple3<Integer, String, Integer>, Tuple3<String, String, Integer>>() {
            List<Tuple2<Integer, String>> bc_student = null;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                bc_student = getRuntimeContext().getBroadcastVariable("bc_student");
            }

            @Override
            public Tuple3<String, String, Integer> map(Tuple3<Integer, String, Integer> value) throws Exception {
                int stu_id = value.f0;
                for (Tuple2<Integer, String> tuple2 : bc_student) {
                    if (tuple2.f0 == stu_id) {
                        return new Tuple3<String, String, Integer>(tuple2.f1, value.f1, value.f2);
                    }
                }
                return null;
            }
        });


    }
}
